Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data: Handle case where partition location is missing for TableMigrationUtil #12212

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

jshmchenxi
Copy link
Contributor

@jshmchenxi jshmchenxi commented Feb 10, 2025

When we use the SnapshotTableSparkAction to create an Iceberg table based on a Hive table, and the Hive table contains some partition of which the partition location is missing from the file system, the Spark procedure would fail with the following exception:

Caused by: java.lang.RuntimeException: Unable to list files in partition: s3://bucket/table/partition=foo
	at org.apache.iceberg.data.TableMigrationUtil.listPartition(TableMigrationUtil.java:206)
	at org.apache.iceberg.spark.SparkTableUtil.listPartition(SparkTableUtil.java:309)
	at org.apache.iceberg.spark.SparkTableUtil.lambda$importSparkPartitions$37333fc7$1(SparkTableUtil.java:767)
	at org.apache.spark.sql.Dataset.$anonfun$flatMap$2(Dataset.scala:3484)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:225)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.$anonfun$prepareShuffleDependency$10(ShuffleExchangeExec.scala:375)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.io.FileNotFoundException: No such file or directory: s3://bucket/table/partition=foo
	at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3799)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3650)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.innerListStatus(S3AFileSystem.java:3373)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$null$22(S3AFileSystem.java:3344)
	at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:122)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$listStatus$23(S3AFileSystem.java:3343)
	at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(IOStatisticsBinding.java:547)
	at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:528)
	at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:449)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2478)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2497)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.listStatus(S3AFileSystem.java:3342)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2078)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2122)
	at org.apache.iceberg.data.TableMigrationUtil.listPartition(TableMigrationUtil.java:167)
	... 32 more

This PR adds support for such case and treats the partition location as an empty directory.

@github-actions github-actions bot added the data label Feb 10, 2025
@jshmchenxi jshmchenxi changed the title Handle case where partition location is missing from the file system in TableMigrationUtil Data: Handle case where partition location is missing for TableMigrationUtil Feb 10, 2025
@jshmchenxi jshmchenxi force-pushed the bugfix/spark-util-partition-location-nonexist branch 2 times, most recently from 5b7ff64 to 658010c Compare February 10, 2025 02:31
@manuzhang
Copy link
Collaborator

@jshmchenxi Thanks for the fix. Can you add a test?

Copy link
Member

@RussellSpitzer RussellSpitzer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good although I agree we need a test to check that this is working as expected.

@jshmchenxi jshmchenxi force-pushed the bugfix/spark-util-partition-location-nonexist branch 2 times, most recently from 36e2b7b to f3c2e11 Compare February 11, 2025 06:52
@jshmchenxi
Copy link
Contributor Author

@manuzhang @RussellSpitzer Thanks for the suggestion! I've added test cases to cover this change.

@jshmchenxi jshmchenxi force-pushed the bugfix/spark-util-partition-location-nonexist branch 2 times, most recently from 60ef66a to 23a1b11 Compare February 11, 2025 06:56
@jshmchenxi jshmchenxi force-pushed the bugfix/spark-util-partition-location-nonexist branch from 23a1b11 to 6b96233 Compare February 11, 2025 07:04
@manuzhang
Copy link
Collaborator

@jshmchenxi Can we add an end-to-end test in TestSnapshotTableAction?

@jshmchenxi jshmchenxi force-pushed the bugfix/spark-util-partition-location-nonexist branch from 6b96233 to 2aacf49 Compare February 16, 2025 02:34
@github-actions github-actions bot added the spark label Feb 16, 2025
@jshmchenxi
Copy link
Contributor Author

@jshmchenxi Can we add an end-to-end test in TestSnapshotTableAction?

@manuzhang I've added the end-to-end test. Please take a look.

@jshmchenxi
Copy link
Contributor Author

Kindly ping @manuzhang @RussellSpitzer @stevenzwu

Arrays.stream(
fs.exists(partitionDir)
? fs.listStatus(partitionDir, HIDDEN_PATH_FILTER)
: new FileStatus[] {})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we log something here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I've added log here.

@jshmchenxi jshmchenxi force-pushed the bugfix/spark-util-partition-location-nonexist branch from 2aacf49 to 5e58ba9 Compare February 19, 2025 02:43
@jshmchenxi jshmchenxi force-pushed the bugfix/spark-util-partition-location-nonexist branch from 5e58ba9 to e0c9f62 Compare February 21, 2025 01:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants